前面之所以會需要設計從 OpenTelemetry Collector、API Gateway、Lambda、Firehose 一直到 S3 Table 之間的 data pipeline,其中一個原因便是目前的 OTel collector 尚沒有專門針對 Apache Iceberg 或者 S3 Table 的 exporter。
其實,我們也可以實作自己的 exporter,讓 OTLP 資料可以順利的寫入 Data Lakehouse。但是實作一個Lakehouse exporter,需要解決哪些技術挑戰?現有的 exporter 設計能給我們什麼啟發? 今天我們選擇 ClickHouse Exporter 作為案例研究。選擇它有兩個原因:
因此今天我們將透過 ClickHouse Exporter 的原始碼分析,來了解 ClickHouse Exporter 是怎麼統一管理不同類型的 signal、OTLP 如何轉換成目標系統的 schema、以及 schama的定義如何影響查詢效能。
了解之後,不管是日後在優化原有 pipeline 的資料轉換,或者要自己客製化一個 exporter,都有助於打下基礎的概念。
從 ClickHouse Exporter 的設定檔來看,我們會需要指定各個 signals 所放置的 table,如果所指定的 database 和 table 不存在,exporter 便會自動建立對應的 database 和 table。
receivers:
examplereceiver:
exporters:
clickhouse:
endpoint: tcp://127.0.0.1:9000?dial_timeout=10s
database: otel
async_insert: true
create_schema: true
logs_table_name: otel_logs
traces_table_name: otel_traces
metrics_tables:
gauge:
name: "otel_metrics_gauge"
sum:
name: "otel_metrics_sum"
summary:
name: "otel_metrics_summary"
histogram:
name: "otel_metrics_histogram"
exponential_histogram:
name: "otel_metrics_exp_histogram"
這背後便是透過 Factory Pattern 來實作。可以在factory.go中看到:
func NewFactory() exporter.Factory {
return exporter.NewFactory(
metadata.Type,
createDefaultConfig,
exporter.WithLogs(createLogsExporter, metadata.LogsStability),
exporter.WithTraces(createTracesExporter, metadata.TracesStability),
exporter.WithMetrics(createMetricExporter, metadata.MetricsStability),
)
}
可以看到 Logs/Traces/Metrics 分別使用不同的函數(createLogsExporter
, createTracesExporter
, createMetricExporter
)來建立,這可以讓 exporter 可以選擇性支援某些 signal,例如說,某個 exporter 只想支援 traces,就只需要加上 WithTraces 即可。
每個 signal 都有對應的建立函式,我們來看看 Metrics 的函式 createMetricExporter
:
func createMetricExporter(
ctx context.Context,
set exporter.Settings,
cfg component.Config,
) (exporter.Metrics, error) {
c := cfg.(*Config)
c.collectorVersion = set.BuildInfo.Version
exp := newMetricsExporter(set.Logger, c)
return exporterhelper.NewMetrics(
ctx,
set,
cfg,
exp.pushMetricsData,
exporterhelper.WithStart(exp.start),
exporterhelper.WithShutdown(exp.shutdown),
exporterhelper.WithTimeout(c.TimeoutSettings),
exporterhelper.WithQueue(c.QueueSettings),
exporterhelper.WithRetry(c.BackOffConfig),
)
}
這裡又出現了一層 Functional Options Pattern。exporterhelper.NewMetrics()
是 OpenTelemetry Collector 核心框架提供的函式,它接收:
ClickHouse 專屬的邏輯(由 exporter 開發者實作)
exp.pushMetricsData
:如何將 OTLP metrics 轉換並寫入 ClickHouseexp.start
:建立資料庫連線、建立 table schemaexp.shutdown
:優雅關閉連線,確保資料寫入完成通用的可靠性機制(由框架統一提供)
WithTimeout
:防止單次操作耗時過長WithQueue
:批次累積資料,減少網路往返WithRetry
:網路失敗時自動重試,搭配 exponential backoff而exporterhelper.NewMetrics()
的任務則是將上面兩個部分給組合起來。這樣的好處是,exporter 的開發者可以專注在第一部分的資料轉換邏輯,不必在客製化的 exporter 上重複實作 queue、retry 等機制。
由於這些可靠性機制是所有 exporter 共通的需求,OpenTelemetry Collector 將它們統一實作在 exporterhelper 套件中,客製化的 exporter 都使用這個套件來獲得一致的可靠性保障。
在 Factory Pattern 中,我們看到 WithStart
和 WithShutdown
兩個選項,它們主要負責管理 exporter 的生命週期。
OpenTelemetry 框架定義了兩個標準的type
type c func(context.Context, Host) error
type ShutdownFunc func(context.Context) error
每個 exporter 都需要實作符合這兩個型別的方法,然後透過 WithStart()
和 WithShutdown()
將這些方法傳給框架。當 Collector 啟動或關閉時,框架會在適當的時機呼叫這些方法。
當 Collector 啟動時,框架會呼叫 start() 方法來初始化 exporter。以 metrics exporter 為例:
// 來源:exporter_metrics.go
func (e *metricsExporter) start(ctx context.Context, _ component.Host) error {
metrics.SetLogger(e.logger)
// 1. 建立 ClickHouse 連線配置
opt, err := e.cfg.buildClickHouseOptions()
if err != nil {
return err
}
// 2. 建立資料庫連線
e.db, err = internal.NewClickhouseClientFromOptions(opt)
if err != nil {
return err
}
// 3. 如果設定允許,自動建立 database 和 table
if e.cfg.shouldCreateSchema() {
database := e.cfg.database()
clusterStr := e.cfg.clusterString()
if err := internal.CreateDatabase(ctx, e.db, database, clusterStr); err != nil {
return err
}
ttlExpr := internal.GenerateTTLExpr(e.cfg.TTL, "toDateTime(TimeUnix)")
err := metrics.NewMetricsTable(ctx, e.tablesConfig, database, clusterStr, e.cfg.tableEngineString(), ttlExpr, e.db)
if err != nil {
return err
}
}
return nil
}
start()
方法做了三件事:
當 Collector 收到關閉信號(例如 SIGTERM)時,框架會呼叫 shutdown():
func (e *metricsExporter) shutdown(_ context.Context) error {
if e.db != nil {
return e.db.Close()
}
return nil
}
shutdown()
方法負責釋放資源,主要是關閉資料庫連線。這裡有個重要的細節:在 shutdown() 被呼叫前,框架會先等待 queue 中的資料都處理完畢,確保沒有資料遺失。
這種做法也叫做 gracefully shutdown,當伺服器收到終止的指令後,如果手上還有正在執行的process,它會先處理完,之後才會真的關閉服務,這麼作不僅可以保障資料的一致與完整性,我們也不需要害怕突然中止程式可能會導致非預期的錯誤。
今天從 ClickHouse Exporter 的設定檔作為開頭,來了解 exporter 是如何透過工廠模式來管理不同類型的 signal,並且了解各個 exporter 是如何管理其生命週期。明天,讓我們繼續透過原始碼,來了解 exporter 的資料處理流程。
[Go 教學] 什麼是 graceful shutdown?
OpenTelemetry Docs - Exporters
opentelemetry-collector/exporter/exporterhelper